package com.example.hadoopdemo.executor.recommend.movie;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**
 * 计算推荐结果，优化{@link Step4}，支持分布式
 *
 * @author Ruison
 * @date 2021/12/9
 */
public class Step4Update {
    /**
     * step3_1和step3_2数据整合
     */
    public static class IntegrationData extends Mapper<LongWritable, Text, Text, Text> {
        // 数据标识 A:step3_1评分矩阵 or B:step3_2同现矩阵
        private String flag;

        @Override
        protected void setup(Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException {
            // 设置数据标识
            FileSplit split = (FileSplit) context.getInputSplit();
            flag = split.getPath().getParent().getName();
        }

        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException {
            String[] values = Recommend.DELIMITER.split(value.toString());
            if (flag.equals("step3_1")) {
                String[] v = values[1].split(":");
                String userId = v[0];
                String itemId = values[0];
                double preferenceScore = Double.parseDouble(v[1]);
                context.write(new Text(itemId), new Text("A:" + userId + "," + preferenceScore));
            } else if (flag.equals("step3_2")) {
                String[] v = values[0].split(":");
                String itemId1 = v[0];
                String itemId2 = v[1];
                int num = Integer.parseInt(values[1]);
                context.write(new Text(itemId1), new Text("B:" + itemId2 + "," + num));
            }
        }
    }

    /**
     * 矩阵乘法
     */
    public static class Aggregate extends Reducer<Text, Text, Text, Text> {
        @Override
        protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {
            Map<String, Double> mapA = new HashMap<>(16);
            Map<String, Double> mapB = new HashMap<>(16);
            for (Text line : values) {
                String value = line.toString();
                String[] kv = Recommend.DELIMITER.split(value.substring(2));
                if (value.startsWith("A:")) {
                    mapA.put(kv[0], Double.parseDouble(kv[1]));
                } else if (value.startsWith("B:")) {
                    mapB.put(kv[0], Double.parseDouble(kv[1]));
                }
            }
            double result;
            for (Map.Entry<String, Double> bEntry : mapB.entrySet()) {
                for (Map.Entry<String, Double> aEntry : mapA.entrySet()) {
                    // 矩阵乘法相乘计算
                    result = bEntry.getValue() * aEntry.getValue();
                    context.write(new Text(aEntry.getKey()), new Text(bEntry.getKey() + "," + result));
                }
            }
        }
    }
}
